/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.phoenix.mapreduce.index;

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.hbase.index.AbstractValueGetter;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.EquiDepthStreamHistogram;
import org.apache.phoenix.util.EquiDepthStreamHistogram.Bucket;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.base.Strings;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;

/**
 * An MR job to populate the index table from the data table.
 */
public class IndexTool extends Configured implements Tool {
  public enum IndexVerifyType {
    BEFORE("BEFORE"),
    AFTER("AFTER"),
    BOTH("BOTH"),
    ONLY("ONLY"),
    NONE("NONE");

    private String value;
    private byte[] valueBytes;

    IndexVerifyType(String value) {
      this.value = value;
      this.valueBytes = PVarchar.INSTANCE.toBytes(value);
    }

    public String getValue() {
      return this.value;
    }

    public byte[] toBytes() {
      return this.valueBytes;
    }

    public static IndexVerifyType fromValue(String value) {
      for (IndexVerifyType verifyType : IndexVerifyType.values()) {
        if (value.equals(verifyType.getValue())) {
          return verifyType;
        }
      }
      throw new IllegalStateException("Invalid value: " + value + " for " + IndexVerifyType.class);
    }

    public static IndexVerifyType fromValue(byte[] value) {
      return fromValue(Bytes.toString(value));
    }
  }

  public enum IndexDisableLoggingType {
    NONE("NONE"),
    BEFORE("BEFORE"),
    AFTER("AFTER"),
    BOTH("BOTH");

    private String value;
    private byte[] valueBytes;

    IndexDisableLoggingType(String value) {
      this.value = value;
      this.valueBytes = PVarchar.INSTANCE.toBytes(value);
    }

    public String getValue() {
      return this.value;
    }

    public byte[] toBytes() {
      return this.valueBytes;
    }

    public static IndexDisableLoggingType fromValue(String value) {
      for (IndexDisableLoggingType disableLoggingType : IndexDisableLoggingType.values()) {
        if (value.equals(disableLoggingType.getValue())) {
          return disableLoggingType;
        }
      }
      throw new IllegalStateException(
        "Invalid value: " + value + " for " + IndexDisableLoggingType.class);
    }

    public static IndexDisableLoggingType fromValue(byte[] value) {
      return fromValue(Bytes.toString(value));
    }
  }

  private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);

  // The raw identifiers as passed in, with the escaping used in SQL
  // (double quotes for case sensitivity)
  private String schemaName;
  private String dataTable;
  private String indexTable;
  private String dataTableWithSchema;
  private String indexTableWithSchema;

  private boolean isPartialBuild, isForeground;
  private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
  private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE;
  private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE;
  // The qualified normalized table names (no double quotes, case same as HBase table)
  private String qDataTable; // normalized with schema
  private String qIndexTable; // normalized with schema
  private String qSchemaName;
  private boolean useSnapshot;
  private boolean isLocalIndexBuild = false;
  private boolean shouldDeleteBeforeRebuild;
  private PTable pIndexTable = null;
  private PTable pDataTable;
  private String tenantId = null;
  private Job job;
  private Long startTime, endTime, lastVerifyTime;
  private IndexType indexType;
  private String basePath;
  byte[][] splitKeysBeforeJob = null;
  Configuration configuration;

  private static final Option SCHEMA_NAME_OPTION =
    new Option("s", "schema", true, "Phoenix schema name (optional)");
  private static final Option DATA_TABLE_OPTION =
    new Option("dt", "data-table", true, "Data table name (mandatory)");
  private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true,
    "Index table name(not required in case of partial rebuilding)");

  private static final Option PARTIAL_REBUILD_OPTION = new Option("pr", "partial-rebuild", false,
    "To build indexes for a data table from least disabledTimeStamp");

  private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
    "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");

  private static final Option VERIFY_OPTION = new Option("v", "verify", true,
    "To verify every data row has a corresponding row of a global index. For other types of indexes, "
      + "this option will be silently ignored. The accepted values are NONE, ONLY, BEFORE,  AFTER, and BOTH. "
      + "NONE is for no inline verification, which is also the default for this option. ONLY is for "
      + "verifying without rebuilding index rows. The rest for verifying before, after, and both before "
      + "and after rebuilding row. If the verification is done before rebuilding rows and the correct "
      + "index rows will not be rebuilt");

  private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;

  private static final Option SPLIT_INDEX_OPTION = new Option("sp", "split", true,
    "Split the index table before building, to have the same # of regions as the data table.  "
      + "The data table is sampled to get uniform index splits across the index values.  "
      + "Takes an optional argument specifying the sampling rate," + "otherwise defaults to "
      + DEFAULT_SPLIT_SAMPLING_RATE);

  private static final int DEFAULT_AUTOSPLIT_NUM_REGIONS = 20;

  private static final Option AUTO_SPLIT_INDEX_OPTION = new Option("spa", "autosplit", true,
    "Automatically split the index table if the # of data table regions is greater than N. "
      + "Takes an optional argument specifying N, otherwise defaults to "
      + DEFAULT_AUTOSPLIT_NUM_REGIONS
      + ".  Can be used in conjunction with -split option to specify the sampling rate");

  private static final Option RUN_FOREGROUND_OPTION =
    new Option("runfg", "run-foreground", false, "Applicable on top of -direct option."
      + "If specified, runs index build in Foreground. Default - Runs the build in background.");
  private static final Option OUTPUT_PATH_OPTION =
    new Option("op", "output-path", true, "Output path where the files are written");
  private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false,
    "If specified, uses Snapshots for async index building (optional)");
  private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true,
    "If specified, uses Tenant connection for tenant view index building (optional)");

  private static final Option DELETE_ALL_AND_REBUILD_OPTION =
    new Option("deleteall", "delete-all-and-rebuild", false,
      "Applicable only to global indexes on tables, not to local or view indexes. "
        + "If specified, truncates the index table and rebuilds (optional)");

  private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
  private static final Option START_TIME_OPTION =
    new Option("st", "start-time", true, "Start time for indextool rebuild or verify");
  private static final Option END_TIME_OPTION =
    new Option("et", "end-time", true, "End time for indextool rebuild or verify");

  private static final Option RETRY_VERIFY_OPTION = new Option("rv", "retry-verify", true,
    "Max scan ts of the last rebuild/verify that needs to be retried incrementally");

  private static final Option DISABLE_LOGGING_OPTION = new Option("dl", "disable-logging", true,
    "Disable logging of failed verification rows for BEFORE, " + "AFTER, or BOTH verify jobs");

  private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION =
    new Option("fi", "from-index", false,
      "To verify every row in the index table has a corresponding row in the data table. "
        + "Only supported for global indexes. If this option is used with -v AFTER, these "
        + "extra rows will be identified but not repaired.");

  public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";

  public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
    + "or equal to endTime " + "or either of them are set in the future; IndexTool can't proceed.";

  public static final String FEATURE_NOT_APPLICABLE =
    "start-time/end-time and retry verify feature are only "
      + "applicable for local or non-transactional global indexes";

  public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
    + "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";

  private Options getOptions() {
    final Options options = new Options();
    options.addOption(SCHEMA_NAME_OPTION);
    options.addOption(DATA_TABLE_OPTION);
    options.addOption(INDEX_TABLE_OPTION);
    options.addOption(PARTIAL_REBUILD_OPTION);
    options.addOption(DIRECT_API_OPTION);
    options.addOption(VERIFY_OPTION);
    options.addOption(RUN_FOREGROUND_OPTION);
    options.addOption(OUTPUT_PATH_OPTION);
    options.addOption(SNAPSHOT_OPTION);
    options.addOption(TENANT_ID_OPTION);
    options.addOption(DELETE_ALL_AND_REBUILD_OPTION);
    options.addOption(HELP_OPTION);
    AUTO_SPLIT_INDEX_OPTION.setOptionalArg(true);
    SPLIT_INDEX_OPTION.setOptionalArg(true);
    START_TIME_OPTION.setOptionalArg(true);
    END_TIME_OPTION.setOptionalArg(true);
    RETRY_VERIFY_OPTION.setOptionalArg(true);
    options.addOption(AUTO_SPLIT_INDEX_OPTION);
    options.addOption(SPLIT_INDEX_OPTION);
    options.addOption(START_TIME_OPTION);
    options.addOption(END_TIME_OPTION);
    options.addOption(RETRY_VERIFY_OPTION);
    options.addOption(DISABLE_LOGGING_OPTION);
    options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION);
    return options;
  }

  /**
   * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
   * missing.
   * @param args supplied command line arguments
   * @return the parsed command line
   */
  @VisibleForTesting
  public CommandLine parseOptions(String[] args) {

    final Options options = getOptions();

    CommandLineParser parser = DefaultParser.builder().setAllowPartialMatching(false)
      .setStripLeadingAndTrailingQuotes(false).build();
    CommandLine cmdLine = null;
    try {
      cmdLine = parser.parse(options, args);
    } catch (ParseException e) {
      printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
    }

    if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
      printHelpAndExit(options, 0);
    }

    if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
      throw new IllegalStateException(
        DATA_TABLE_OPTION.getLongOpt() + " is a mandatory " + "parameter");
    }

    if (
      cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
        && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
    ) {
      throw new IllegalStateException(
        "Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
    }

    if (
      !cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
        && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
    ) {
      throw new IllegalStateException(
        "Index name should be passed unless it is a partial rebuild.");
    }

    if (
      cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())
        && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())
    ) {
      throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt()
        + " is not compatible with " + PARTIAL_REBUILD_OPTION.getLongOpt());
    }

    boolean splitIndex = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt())
      || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
    if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
      throw new IllegalStateException("Must pass an index name for the split index option");
    }
    if (splitIndex && cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt())) {
      throw new IllegalStateException(
        "Cannot split index for a partial rebuild, as the index table is dropped");
    }
    if (loggingDisabledMismatchesVerifyOption(cmdLine)) {
      throw new IllegalStateException("Can't disable index verification logging when no "
        + "index verification or the wrong kind of index verification has been requested. "
        + "VerifyType: [" + cmdLine.getOptionValue(VERIFY_OPTION.getOpt()) + "] and "
        + "DisableLoggingType: [" + cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()) + "]");
    }
    return cmdLine;
  }

  private boolean loggingDisabledMismatchesVerifyOption(CommandLine cmdLine) {
    boolean loggingDisabled = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
    if (!loggingDisabled) {
      return false;
    }
    boolean hasVerifyOption = cmdLine.hasOption(VERIFY_OPTION.getOpt());
    if (!hasVerifyOption) {
      return true;
    }
    String loggingDisableValue = cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt());
    String verifyValue = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
    IndexDisableLoggingType loggingDisableType =
      IndexDisableLoggingType.fromValue(loggingDisableValue);
    IndexVerifyType verifyType = IndexVerifyType.fromValue(verifyValue);
    // error if we're trying to disable logging when we're not doing any verification
    if (verifyType.equals(IndexVerifyType.NONE)) {
      return true;
    }
    // error if we're disabling logging after rebuild but we're not verifying after rebuild
    if (
      (verifyType.equals(IndexVerifyType.BEFORE) || verifyType.equals(IndexVerifyType.ONLY))
        && loggingDisableType.equals(IndexDisableLoggingType.AFTER)
    ) {
      return true;
    }
    // error if we're disabling logging before rebuild but we're not verifying before rebuild
    if (
      (verifyType.equals(IndexVerifyType.AFTER))
        && loggingDisableType.equals(IndexDisableLoggingType.BEFORE)
    ) {
      return true;
    }
    if (
      loggingDisableType.equals(IndexDisableLoggingType.BOTH)
        && !verifyType.equals(IndexVerifyType.BOTH)
    ) {
      return true;
    }
    return false;
  }

  private void printHelpAndExit(String errorMessage, Options options) {
    System.err.println(errorMessage);
    printHelpAndExit(options, 1);
  }

  private void printHelpAndExit(Options options, int exitCode) {
    HelpFormatter formatter = new HelpFormatter();
    formatter.printHelp("help", options);
    System.exit(exitCode);
  }

  public Long getStartTime() {
    return startTime;
  }

  public Long getEndTime() {
    return endTime;
  }

  public Long getLastVerifyTime() {
    return lastVerifyTime;
  }

  public IndexTool.IndexDisableLoggingType getDisableLoggingType() {
    return disableLoggingType;
  }

  public IndexScrutinyTool.SourceTable getSourceTable() {
    return sourceTable;
  }

  class JobFactory {
    Connection connection;
    Configuration configuration;
    private Path outputPath;
    private FileSystem fs;

    public JobFactory(Connection connection, Configuration configuration, Path outputPath) {
      this.connection = connection;
      this.configuration = configuration;
      this.outputPath = outputPath;
    }

    public Job getJob() throws Exception {
      if (isPartialBuild) {
        return configureJobForPartialBuild();
      } else {
        long maxTimeRange = pIndexTable.getTimeStamp() + 1;
        // this is set to ensure index tables remains consistent post population.
        if (pDataTable.isTransactional()) {
          configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
            Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
          configuration.set(PhoenixConfigurationUtil.TX_PROVIDER,
            pDataTable.getTransactionProvider().name());
        }
        if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) {
          PhoenixConfigurationUtil.setCurrentScnValue(configuration, maxTimeRange);
          if (indexVerifyType != IndexVerifyType.NONE) {
            LOGGER.warn("Verification is not supported for snapshots and transactional"
              + "table index rebuilds, verification parameter ignored");
          }
          return configureJobForAsyncIndex();
        } else {
          // Local and non-transactional global indexes to be built on the server side
          // It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure
          // that
          // all the rows that exist so far will be rebuilt. The current time of the servers will
          // be used to set the time range for server side scans.

          // However, PHOENIX-5732 introduces endTime parameter to be passed optionally for
          // IndexTool.
          // When endTime is passed for local and non-tx global indexes, we'll override the
          // CURRENT_SCN_VALUE.
          if (endTime != null) {
            PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
          }
          if (lastVerifyTime != null) {
            PhoenixConfigurationUtil.setIndexToolLastVerifyTime(configuration, lastVerifyTime);
          }
          return configureJobForServerBuildIndex();
        }
      }
    }

    private Job configureJobForPartialBuild() throws Exception {
      connection = ConnectionUtil.getInputConnection(configuration);
      long minDisableTimestamp = HConstants.LATEST_TIMESTAMP;
      PTable indexWithMinDisableTimestamp = null;

      // Get Indexes in building state, minDisabledTimestamp
      List<String> disableIndexes = new ArrayList<String>();
      List<PTable> disabledPIndexes = new ArrayList<PTable>();
      for (PTable index : pDataTable.getIndexes()) {
        if (index.getIndexState().equals(PIndexState.BUILDING)) {
          disableIndexes.add(index.getTableName().getString());
          disabledPIndexes.add(index);
          // We need a way of differentiating the block writes to data table case from
          // the leave index active case. In either case, we need to know the time stamp
          // at which writes started failing so we can rebuild from that point. If we
          // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
          // then writes to the data table will be blocked (this is client side logic
          // and we can't change this in a minor release). So we use the sign of the
          // time stamp to differentiate.
          long indexDisableTimestamp = Math.abs(index.getIndexDisableTimestamp());
          if (minDisableTimestamp > indexDisableTimestamp) {
            minDisableTimestamp = indexDisableTimestamp;
            indexWithMinDisableTimestamp = index;
          }
        }
      }

      if (indexWithMinDisableTimestamp == null) {
        throw new Exception("There is no index for a datatable to be rebuild:" + qDataTable);
      }
      if (minDisableTimestamp == 0) {
        throw new Exception("It seems Index " + indexWithMinDisableTimestamp
          + " has disable timestamp as 0 , please run IndexTool with IndexName to build it first");
        // TODO probably we can initiate the job by ourself or can skip them while making the list
        // for partial build with a warning
      }

      long maxTimestamp = getMaxRebuildAsyncDate(schemaName, disableIndexes);

      // serialize index maintaienr in job conf with Base64 TODO: Need to find better way to
      // serialize them in conf.
      List<IndexMaintainer> maintainers =
        Lists.newArrayListWithExpectedSize(disabledPIndexes.size());
      for (PTable index : disabledPIndexes) {
        maintainers
          .add(index.getIndexMaintainer(pDataTable, connection.unwrap(PhoenixConnection.class)));
      }
      ImmutableBytesWritable indexMetaDataPtr =
        new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
      IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes,
        connection.unwrap(PhoenixConnection.class));
      PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr);
      if (!Strings.isNullOrEmpty(tenantId)) {
        PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
      }

      // Prepare raw scan
      Scan scan = IndexManagementUtil.newLocalStateScan(maintainers);
      scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp);
      scan.setRaw(true);
      scan.setCacheBlocks(false);
      if (pDataTable.isTransactional()) {
        long maxTimeRange = pDataTable.getTimeStamp() + 1;
        scan.setAttribute(BaseScannerRegionObserverConstants.TX_SCN,
          Bytes.toBytes(TransactionUtil.convertToNanoseconds(maxTimeRange)));
      }

      String physicalTableName = pDataTable.getPhysicalName().getString();
      final String jobName =
        String.format("Phoenix Indexes build for " + pDataTable.getName().toString());

      PhoenixConfigurationUtil.setInputTableName(configuration, dataTableWithSchema);
      PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName);

      // TODO: update disable indexes
      PhoenixConfigurationUtil.setDisableIndexes(configuration,
        StringUtils.join(",", disableIndexes));

      final Job job = Job.getInstance(configuration, jobName);
      if (outputPath != null) {
        FileOutputFormat.setOutputPath(job, outputPath);
      }
      job.setJarByClass(IndexTool.class);
      TableMapReduceUtil.initTableMapperJob(physicalTableName, scan,
        PhoenixIndexPartialBuildMapper.class, null, null, job);
      TableMapReduceUtil.initCredentials(job);
      TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName));
      return configureSubmittableJobUsingDirectApi(job);
    }

    private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes)
      throws SQLException {
      Long maxRebuilAsyncDate = HConstants.LATEST_TIMESTAMP;
      Long maxDisabledTimeStamp = 0L;
      if (disableIndexes == null || disableIndexes.isEmpty()) {
        return 0;
      }
      String query = String.format(
        "SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "), " + "MAX(" + INDEX_DISABLE_TIMESTAMP
          + ") FROM " + SYSTEM_CATALOG_NAME + " (" + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE "
          + TABLE_SCHEM + " %s  AND " + TABLE_NAME + " IN ( %s )",
        (schemaName != null && schemaName.length() > 0) ? " = ? " : " IS NULL ",
        QueryUtil.generateInListParams(disableIndexes.size()));
      try (PreparedStatement selSyscat = connection.prepareStatement(query)) {
        int param = 0;
        if (schemaName != null && schemaName.length() > 0) {
          selSyscat.setString(++param, schemaName);
        }
        QueryUtil.setQuoteInListElements(selSyscat, disableIndexes, param);
        ResultSet rs = selSyscat.executeQuery();
        if (rs.next()) {
          maxRebuilAsyncDate = rs.getLong(1);
          maxDisabledTimeStamp = rs.getLong(2);
        }
        // Do check if table is disabled again after user invoked async rebuilding during the run of
        // the job
        if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
          return maxRebuilAsyncDate;
        } else {
          throw new RuntimeException(
            "Inconsistent state we have one or more index tables which are disabled after the async is called!!");
        }
      }
    }

    private Job configureJobForAsyncIndex() throws Exception {
      String physicalIndexTable = pIndexTable.getPhysicalName().getString();
      final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
      final PostIndexDDLCompiler ddlCompiler =
        new PostIndexDDLCompiler(pConnection, new TableRef(pDataTable));
      ddlCompiler.compile(pIndexTable);
      final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
      final String selectQuery = ddlCompiler.getSelectQuery();
      final String upsertQuery =
        QueryUtil.constructUpsertStatement(indexTableWithSchema, indexColumns, Hint.NO_INDEX);

      configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
      PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
      PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
      PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);

      PhoenixConfigurationUtil.setUpsertColumnNames(configuration,
        indexColumns.toArray(new String[indexColumns.size()]));
      if (tenantId != null) {
        PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
      }
      final List<ColumnInfo> columnMetadataList =
        PhoenixRuntime.generateColumnInfo(pConnection, indexTableWithSchema, indexColumns);
      ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList);

      if (outputPath != null) {
        fs = outputPath.getFileSystem(configuration);
        fs.delete(outputPath, true);
      }
      final String jobName =
        String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
      final Job job = Job.getInstance(configuration, jobName);
      job.setJarByClass(IndexTool.class);
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
      if (outputPath != null) {
        FileOutputFormat.setOutputPath(job, outputPath);
      }

      if (!useSnapshot) {
        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, dataTableWithSchema,
          selectQuery);
      } else {
        Admin admin = null;
        String snapshotName;
        try {
          admin = pConnection.getQueryServices().getAdmin();
          TableName hDdataTableName = TableName.valueOf(pDataTable.getPhysicalName().getBytes());
          snapshotName = new StringBuilder("INDEXTOOL-").append(pDataTable.getName().getString())
            .append("-Snapshot-").append(System.currentTimeMillis()).toString();
          // FIXME Drop this snapshot after we're done ?
          admin.snapshot(snapshotName, hDdataTableName);
        } finally {
          if (admin != null) {
            admin.close();
          }
        }
        // root dir not a subdirectory of hbase dir
        Path rootDir = new Path("hdfs:///index-snapshot-dir");
        CommonFSUtils.setRootDir(configuration, rootDir);
        Path restoreDir = new Path(CommonFSUtils.getRootDir(configuration), "restore-dir");

        // set input for map reduce job using hbase snapshots
        PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName,
          dataTableWithSchema, restoreDir, selectQuery);
      }
      TableMapReduceUtil.initCredentials(job);

      job.setMapperClass(PhoenixIndexImportDirectMapper.class);
      return configureSubmittableJobUsingDirectApi(job);
    }

    private Job configureJobForServerBuildIndex() throws Exception {
      long indexRebuildQueryTimeoutMs =
        configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
          QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
      long indexRebuildRPCTimeoutMs =
        configuration.getLong(QueryServices.INDEX_REBUILD_RPC_TIMEOUT_ATTRIB,
          QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_TIMEOUT);
      long indexRebuildClientScannerTimeOutMs =
        configuration.getLong(QueryServices.INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT_ATTRIB,
          QueryServicesOptions.DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT);
      int indexRebuildRpcRetriesCounter =
        configuration.getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
          QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER);
      // Set various phoenix and hbase level timeouts and rpc retries
      configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
        Long.toString(indexRebuildQueryTimeoutMs));
      configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
        Long.toString(indexRebuildClientScannerTimeOutMs));
      configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(indexRebuildRPCTimeoutMs));
      configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
        Long.toString(indexRebuildRpcRetriesCounter));
      configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));

      // Randomize execution order, unless explicitly set
      configuration.setBooleanIfUnset(
        PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);

      PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, dataTableWithSchema);
      PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
      PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
      if (startTime != null) {
        PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
      }
      PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
      PhoenixConfigurationUtil.setDisableLoggingVerifyType(configuration, disableLoggingType);
      String physicalIndexTable = pIndexTable.getPhysicalName().getString();

      PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
      PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable);
      if (tenantId != null) {
        PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
      }

      if (outputPath != null) {
        fs = outputPath.getFileSystem(configuration);
        fs.delete(outputPath, true);
      }
      final String jobName =
        String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
      final Job job = Job.getInstance(configuration, jobName);
      job.setJarByClass(IndexTool.class);
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
      if (outputPath != null) {
        FileOutputFormat.setOutputPath(job, outputPath);
      }

      PhoenixMapReduceUtil.setInput(job, PhoenixServerBuildIndexDBWritable.class,
        PhoenixServerBuildIndexInputFormat.class, dataTableWithSchema, "");

      TableMapReduceUtil.initCredentials(job);
      job.setMapperClass(PhoenixServerBuildIndexMapper.class);
      return configureSubmittableJobUsingDirectApi(job);
    }

    /**
     * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or
     * waits for the job completion based on runForeground parameter.
     */
    private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception {
      job.setReducerClass(PhoenixIndexImportDirectReducer.class);
      Configuration conf = job.getConfiguration();
      HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
      // Set the Physical Table name for use in DirectHTableWriter#write(Mutation)
      conf.set(TableOutputFormat.OUTPUT_TABLE,
        PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration()));
      // Set the Output classes
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(NullWritable.class);
      job.setOutputValueClass(NullWritable.class);
      TableMapReduceUtil.addDependencyJars(job);
      job.setNumReduceTasks(1);
      return job;
    }

  }

  public Job getJob() {
    return job;
  }

  public static void createIndexToolTables(Connection connection) throws Exception {
    try (IndexVerificationResultRepository resultRepo = new IndexVerificationResultRepository();
      IndexVerificationOutputRepository outputRepo = new IndexVerificationOutputRepository()) {
      resultRepo.createResultTable(connection);
      outputRepo.createOutputTable(connection);
    }
  }

  @Override
  public int run(String[] args) throws Exception {
    CommandLine cmdLine;
    try {
      cmdLine = parseOptions(args);
    } catch (IllegalStateException e) {
      printHelpAndExit(e.getMessage(), getOptions());
      return -1;
    }
    configuration = HBaseConfiguration.addHbaseResources(getConf());
    populateIndexToolAttributes(cmdLine);

    if (tenantId != null) {
      configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
    }

    try (Connection conn = getConnection(configuration)) {
      createIndexToolTables(conn);
      if (dataTable != null && indexTable != null) {
        setupIndexAndDataTable(conn);
        checkIfFeatureApplicable(startTime, endTime, lastVerifyTime, pDataTable, isLocalIndexBuild);
        if (shouldDeleteBeforeRebuild) {
          deleteBeforeRebuild(conn);
        }
        preSplitIndexTable(cmdLine, conn);
      }

      boolean result = submitIndexToolJob(conn, configuration);

      if (result) {
        return 0;
      } else {
        LOGGER.error("IndexTool job failed! Check logs for errors..");
        return -1;
      }
    } catch (Exception ex) {
      LOGGER.error("An exception occurred while performing the indexing job: "
        + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex));
      return -1;
    }
  }

  public static void checkIfFeatureApplicable(Long startTime, Long endTime, Long lastVerifyTime,
    PTable pDataTable, boolean isLocalIndexBuild) {
    boolean isApplicable = isFeatureApplicable(pDataTable, isLocalIndexBuild);
    if (!isApplicable) {
      if (isTimeRangeSet(startTime, endTime) || lastVerifyTime != null) {
        throw new RuntimeException(FEATURE_NOT_APPLICABLE);
      }
    }
  }

  private boolean submitIndexToolJob(Connection conn, Configuration configuration)
    throws Exception {
    Path outputPath = null;
    FileSystem fs;
    if (basePath != null) {
      outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath),
        pIndexTable == null
          ? pDataTable.getPhysicalName().getString()
          : pIndexTable.getPhysicalName().getString());
      fs = outputPath.getFileSystem(configuration);
      fs.delete(outputPath, true);
    }
    JobFactory jobFactory = new JobFactory(conn, configuration, outputPath);
    job = jobFactory.getJob();
    if (!isForeground) {
      LOGGER.info("Running Index Build in Background - Submit async and exit");
      job.submit();
      return true;
    }
    LOGGER.info("Running Index Build in Foreground. Waits for the build to complete."
      + " This may take a long time!.");
    return job.waitForCompletion(true);
  }

  @VisibleForTesting
  public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
    boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
    boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
    boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
    boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
    boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
    boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
    boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt());

    if (useTenantId) {
      tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
    }
    if (useStartTime) {
      startTime = new Long(cmdLine.getOptionValue(START_TIME_OPTION.getOpt()));
    }
    if (useEndTime) {
      endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
    }
    if (retryVerify) {
      lastVerifyTime = new Long(cmdLine.getOptionValue(RETRY_VERIFY_OPTION.getOpt()));
      validateLastVerifyTime();
    }
    if (isTimeRangeSet(startTime, endTime)) {
      validateTimeRange(startTime, endTime);
    }
    if (verify) {
      String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
      indexVerifyType = IndexVerifyType.fromValue(value);
      if (disableLogging) {
        disableLoggingType = IndexDisableLoggingType
          .fromValue(cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()));
      }
    }

    if (useIndexTableAsSource) {
      sourceTable = SourceTable.INDEX_TABLE_SOURCE;
    }

    schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
    dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
    indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
    isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
    dataTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, dataTable);
    indexTableWithSchema = SchemaUtil.getQualifiedPhoenixTableName(schemaName, indexTable);
    qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
    basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
    isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
    useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
    shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
    return 0;
  }

  public int validateLastVerifyTime() throws Exception {
    Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
    if (
      lastVerifyTime.compareTo(currentTime) > 0 || lastVerifyTime == 0L
        || !isValidLastVerifyTime(lastVerifyTime)
    ) {
      throw new RuntimeException(RETRY_VERIFY_NOT_APPLICABLE);
    }
    return 0;
  }

  public boolean isValidLastVerifyTime(Long lastVerifyTime) throws Exception {
    try (Connection conn = getConnection(configuration);
      Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
        .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES)) {
      Scan s = new Scan();
      ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
      boolean isNamespaceMapped = SchemaUtil.isNamespaceMappingEnabled(null, cqs.getProps());
      s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", lastVerifyTime, ROW_KEY_SEPARATOR,
        SchemaUtil.getPhysicalHBaseTableName(qSchemaName,
          SchemaUtil.normalizeIdentifier(indexTable), isNamespaceMapped))));
      try (ResultScanner rs = hIndexToolTable.getScanner(s)) {
        return rs.next() != null;
      }
    }
  }

  public static void validateTimeRange(Long sTime, Long eTime) {
    Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
    Long st = (sTime == null) ? 0 : sTime;
    Long et = (eTime == null) ? currentTime : eTime;
    if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
      throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
    }
  }

  private Connection getConnection(Configuration configuration) throws SQLException {
    return ConnectionUtil.getInputConnection(configuration);
  }

  private void setupIndexAndDataTable(Connection connection) throws SQLException, IOException {
    pDataTable = connection.unwrap(PhoenixConnection.class).getTableNoCache(qDataTable);
    if (!isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
      throw new IllegalArgumentException(String
        .format(" %s is not an index table for %s for this connection", indexTable, qDataTable));
    }
    qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
    pIndexTable = connection.unwrap(PhoenixConnection.class)
      .getTable(SchemaUtil.getQualifiedTableName(schemaName, indexTable));
    indexType = pIndexTable.getIndexType();
    qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
    if (IndexType.LOCAL.equals(indexType)) {
      isLocalIndexBuild = true;
      if (useSnapshot) {
        throw new IllegalArgumentException(String.format(
          "%s is a local index. snapshots are not supported for local indexes.", qIndexTable));
      }
      try (org.apache.hadoop.hbase.client.Connection hConn =
        getTemporaryHConnection(connection.unwrap(PhoenixConnection.class))) {
        RegionLocator regionLocator =
          hConn.getRegionLocator(TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
        splitKeysBeforeJob = regionLocator.getStartKeys();
      }
    }
    // We have to mark Disable index to Building before we can set it to Active in the reducer.
    // Otherwise it errors out with
    // index state transition error
    changeDisabledIndexStateToBuiding(connection);
  }

  public static boolean isTimeRangeSet(Long startTime, Long endTime) {
    return startTime != null || endTime != null;
  }

  private static boolean isFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) {
    if (isLocalIndexBuild || !dataTable.isTransactional()) {
      return true;
    }
    return false;
  }

  private void changeDisabledIndexStateToBuiding(Connection connection) throws SQLException {
    if (pIndexTable != null && pIndexTable.getIndexState().isDisabled()) {
      IndexUtil.updateIndexState(connection.unwrap(PhoenixConnection.class),
        pIndexTable.getName().getString(), PIndexState.BUILDING, null);
    }
  }

  private void preSplitIndexTable(CommandLine cmdLine, Connection connection)
    throws SQLException, IOException {
    boolean autosplit = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt());
    boolean splitIndex = cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
    boolean isSalted = pIndexTable.getBucketNum() != null; // no need to split salted tables
    if (
      !isSalted
        && (IndexType.GLOBAL.equals(indexType) || IndexType.UNCOVERED_GLOBAL.equals(indexType))
        && (autosplit || splitIndex)
    ) {
      String nOpt = cmdLine.getOptionValue(AUTO_SPLIT_INDEX_OPTION.getOpt());
      int autosplitNumRegions =
        nOpt == null ? DEFAULT_AUTOSPLIT_NUM_REGIONS : Integer.parseInt(nOpt);
      String rateOpt = cmdLine.getOptionValue(SPLIT_INDEX_OPTION.getOpt());
      double samplingRate =
        rateOpt == null ? DEFAULT_SPLIT_SAMPLING_RATE : Double.parseDouble(rateOpt);
      LOGGER.info(String.format(
        "Will split index %s , autosplit=%s ," + " autoSplitNumRegions=%s , samplingRate=%s",
        indexTable, autosplit, autosplitNumRegions, samplingRate));

      splitIndexTable(connection.unwrap(PhoenixConnection.class), autosplit, autosplitNumRegions,
        samplingRate);
    }
  }

  private void deleteBeforeRebuild(Connection conn) throws SQLException, IOException {
    if (MetaDataUtil.isViewIndex(pIndexTable.getPhysicalName().getString())) {
      throw new IllegalArgumentException(String.format(
        "%s is a view index. delete-all-and-rebuild is not supported for view indexes",
        qIndexTable));
    }

    if (isLocalIndexBuild) {
      throw new IllegalArgumentException(String.format(
        "%s is a local index.  delete-all-and-rebuild is not supported for local indexes",
        qIndexTable));
    } else {
      ConnectionQueryServices queryServices =
        conn.unwrap(PhoenixConnection.class).getQueryServices();
      try (Admin admin = queryServices.getAdmin()) {
        TableName tableName = TableName.valueOf(qIndexTable);
        admin.disableTable(tableName);
        admin.truncateTable(tableName, true);
      }
    }
  }

  private void splitIndexTable(PhoenixConnection pConnection, boolean autosplit,
    int autosplitNumRegions, double samplingRate)
    throws SQLException, IOException, IllegalArgumentException {
    int numRegions;

    TableName hDataName = TableName.valueOf(pDataTable.getPhysicalName().getBytes());
    try (org.apache.hadoop.hbase.client.Connection tempHConn = getTemporaryHConnection(pConnection);
      RegionLocator regionLocator = tempHConn.getRegionLocator(hDataName)) {
      numRegions = regionLocator.getStartKeys().length;
      if (autosplit && (numRegions <= autosplitNumRegions)) {
        LOGGER.info(String.format(
          "Will not split index %s because the data table only has %s regions, autoSplitNumRegions=%s",
          pIndexTable.getPhysicalName(), numRegions, autosplitNumRegions));
        return; // do nothing if # of regions is too low
      }
    }
    // build a tablesample query to fetch index column values from the data table
    DataSourceColNames colNames = new DataSourceColNames(pDataTable, pIndexTable);
    String qTableSample = String.format("%s TABLESAMPLE(%.2f)", qDataTable, samplingRate);
    List<String> dataColNames = colNames.getDataColNames();
    final String dataSampleQuery =
      QueryUtil.constructSelectStatement(qTableSample, dataColNames, null, Hint.NO_INDEX, true);
    IndexMaintainer maintainer = IndexMaintainer.create(pDataTable, pIndexTable, pConnection);
    ImmutableBytesWritable dataRowKeyPtr = new ImmutableBytesWritable();
    try (
      final PhoenixResultSet rs =
        pConnection.createStatement().executeQuery(dataSampleQuery).unwrap(PhoenixResultSet.class);
      Admin admin = pConnection.getQueryServices().getAdmin()) {
      EquiDepthStreamHistogram histo = new EquiDepthStreamHistogram(numRegions);
      ValueGetter getter = getIndexValueGetter(rs, dataColNames);
      // loop over data table rows - build the index rowkey, put it in the histogram
      while (rs.next()) {
        rs.getCurrentRow().getKey(dataRowKeyPtr);
        // regionStart/EndKey only needed for local indexes, so we pass null
        byte[] indexRowKey = maintainer.buildRowKey(getter, dataRowKeyPtr, null, null,
          rs.getCurrentRow().getValue(0).getTimestamp());
        histo.addValue(indexRowKey);
      }
      List<Bucket> buckets = histo.computeBuckets();
      // do the split
      // to get the splits, we just need the right bound of every histogram bucket, excluding the
      // last
      byte[][] splitPoints = new byte[buckets.size() - 1][];
      int splitIdx = 0;
      for (Bucket b : buckets.subList(0, buckets.size() - 1)) {
        splitPoints[splitIdx++] = b.getRightBoundExclusive();
      }
      // drop table and recreate with appropriate splits
      TableName hIndexName = TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
      TableDescriptor descriptor = admin.getDescriptor(hIndexName);
      admin.disableTable(hIndexName);
      admin.deleteTable(hIndexName);
      admin.createTable(descriptor, splitPoints);
    }
  }

  private org.apache.hadoop.hbase.client.Connection
    getTemporaryHConnection(PhoenixConnection pConnection) throws SQLException, IOException {
    try (Admin admin = pConnection.getQueryServices().getAdmin()) {
      return ConnectionFactory.createConnection(admin.getConfiguration());
    }
  }

  // setup a ValueGetter to get index values from the ResultSet
  public static ValueGetter getIndexValueGetter(final PhoenixResultSet rs,
    List<String> dataColNames) {
    // map from data col name to index in ResultSet
    final Map<String, Integer> rsIndex = new HashMap<>(dataColNames.size());
    int i = 1;
    for (String dataCol : dataColNames) {
      rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++);
    }
    return new AbstractValueGetter() {
      final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
      final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable();

      @Override
      public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts)
        throws IOException {
        try {
          String fullColumnName = SchemaUtil.getEscapedFullColumnName(
            SchemaUtil.getColumnDisplayName(ref.getFamily(), ref.getQualifier()));
          byte[] colVal = rs.getBytes(rsIndex.get(fullColumnName));
          valuePtr.set(colVal);
        } catch (SQLException e) {
          throw new IOException(e);
        }
        return valuePtr;
      }

      @Override
      public byte[] getRowKey() {
        rs.getCurrentRow().getKey(rowKeyPtr);
        return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
      }
    };
  }

  /**
   * Checks for the validity of the index table passed to the job.
   */
  public static boolean isValidIndexTable(final Connection connection, final String masterTable,
    final String indexTable, final String tenantId) throws SQLException {
    final DatabaseMetaData dbMetaData = connection.getMetaData();
    final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
    final String tableName = SchemaUtil.getTableNameFromFullName(masterTable);

    ResultSet rs = null;
    try {
      String catalog = "";
      if (tenantId != null) {
        catalog = tenantId;
      }
      rs = dbMetaData.getIndexInfo(catalog, schemaName, tableName, false, false);
      while (rs.next()) {
        final String indexName = rs.getString(6);
        if (SchemaUtil.normalizeIdentifier(indexTable).equalsIgnoreCase(indexName)) {
          return true;
        }
      }
    } finally {
      if (rs != null) {
        rs.close();
      }
    }
    return false;
  }

  public static Map.Entry<Integer, Job> run(Configuration conf, String schemaName, String dataTable,
    String indexTable, boolean useSnapshot, String tenantId, boolean disableBefore,
    boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
    final List<String> args = Lists.newArrayList();
    if (schemaName != null) {
      args.add("--schema=" + schemaName);
    }
    // Work around CLI-254. The long-form arg parsing doesn't strip off double-quotes
    args.add("--data-table=" + dataTable);
    args.add("--index-table=" + indexTable);

    if (runForeground) {
      args.add("-runfg");
    }

    if (useSnapshot) {
      args.add("-snap");
    }

    if (tenantId != null) {
      args.add("-tenant");
      args.add(tenantId);
    }

    if (shouldDeleteBeforeRebuild) {
      args.add("-deleteall");
    }

    args.add("-op");
    args.add("/tmp/" + UUID.randomUUID().toString());

    if (disableBefore) {
      PhoenixConfigurationUtil.setDisableIndexes(conf, indexTable);
    }

    IndexTool indexingTool = new IndexTool();
    indexingTool.setConf(conf);
    int status = indexingTool.run(args.toArray(new String[0]));
    Job job = indexingTool.getJob();
    return new AbstractMap.SimpleEntry<>(status, job);
  }

  public static void main(final String[] args) throws Exception {
    int result = ToolRunner.run(new IndexTool(), args);
    System.exit(result);
  }

}
